package org.databandtech.mysql2clickhouse.sink;

import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;

import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;

import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.util.ClickHouseFormat;
import ru.yandex.clickhouse.util.ClickHouseRowBinaryStream;
import ru.yandex.clickhouse.util.ClickHouseStreamCallback;


public class SinkToClickhouse extends RichSinkFunction<Tuple5<String,Integer,String,String,String>> {

	private static final long serialVersionUID = 1L;
	Statement chStatement = null; //ClickHouseStatement chStatement;// PreparedStatement chStatement;
    Connection connection;

    String SQL;
    String URL = "";   //"jdbc:clickhouse://192.168.60.131:9000"
    String USERNAME = "";
    String PASSWORD = "";
    File CSVFILE;
    
    public SinkToClickhouse(String url, String username,String password,String sql) {
        super();
        this.SQL = sql;
        this.URL = url;
        this.USERNAME = username;
        this.PASSWORD = password;
    }

    @Override
    public void open(Configuration parameters) {
    	//https://github.com/ClickHouse/clickhouse-jdbc
        String driver = "ru.yandex.clickhouse.ClickHouseDriver";
        
        //https://github.com/housepower/ClickHouse-Native-JDBC
        //String driver = "com.github.housepower.jdbc.ClickHouseDriver";   
            
        try {
			super.open( parameters );
	        Class.forName( driver );
	        connection = DriverManager.getConnection( URL, USERNAME, PASSWORD );
	        String sql = SQL;
	        //chStatement = (ClickHouseStatement) connection.createStatement();
	        chStatement =  connection.createStatement();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
    }

    @Override
    public void close()  {
        try {
			super.close();
			if (connection != null) {
	            connection.close();
	        }
	        if (chStatement != null) {
	        	chStatement.close();
	        }
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}       
    }

    @Override
    public void invoke(final Tuple5<String,Integer,String,String,String> value) {
    	
    	//RowBinary形式寫
//    	ClickHouseStreamCallback callback = new MyClickHouseStreamCallback(value);
//    	chStatement.write().send(SQL, callback, ClickHouseFormat.RowBinary); 
    	
    	//文件形式寫
//    	try {
//    		chStatement
//			.write() 
//			.table("default.my_table") 
//			.option("format_csv_delimiter", ";") // params
//			.data(CSVFILE) //.data(new File("/path/to/file.csv"), ClickHouseFormat.CSV)
//			.send();
//		} catch (SQLException e) {
//			// TODO Auto-generated catch block
//			e.printStackTrace();
//		}
    	
    	//InputStream形式寫 
//      chs.write().sql(SQL).data(dataInputStream).format(formatClickHouseFormat).send();
    	
    	//淘汰的寫法
//    	try {
//			chStatement.sendNativeStream(SQL, callback);
//		} catch (SQLException e) {
//			// TODO Auto-generated catch block
//			e.printStackTrace();
//		}    	

    	//housepower 写法,更像传统的jdbc。 https://blog.csdn.net/magicpenta/article/details/89515550
    	//<groupId>com.github.housepower</groupId>
        //<artifactId>clickhouse-native-jdbc</artifactId>
    	
//    	Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
//    	Connection connection = DriverManager.getConnection("jdbc:clickhouse://192.168.60.131:9000");
//
//    	PreparedStatement pstmt = connection.prepareStatement("insert into test.jdbc_example values(?, ?, ?)");
//
//    	// insert 10 records
//    	for (int i = 0; i < 10; i++) {
//    	    pstmt.setDate(1, new Date(System.currentTimeMillis()));
//    	    pstmt.setString(2, "panda_" + (i + 1));
//    	    pstmt.setInt(3, 18);
//    	    pstmt.addBatch();
//    	}
//    	pstmt.executeBatch();
    	
    	try {
			chStatement.executeQuery(SQL);
		} catch (SQLException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
    }
    

    public void cancel() {

    }

}
